#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Google Gemini Live API service implementation.

This module provides real-time conversational AI capabilities using Google's
Gemini Live API, supporting both text and audio modalities with
voice transcription, streaming responses, and tool usage.
"""

import base64
import io
import json
import random
import time
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from loguru import logger
from PIL import Image
from pydantic import BaseModel, Field

from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
from pipecat.frames.frames import (
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    InputAudioRawFrame,
    InputImageRawFrame,
    InputTextRawFrame,
    InterruptionFrame,
    LLMContextFrame,
    LLMFullResponseEndFrame,
    LLMFullResponseStartFrame,
    LLMMessagesAppendFrame,
    LLMSetToolsFrame,
    LLMTextFrame,
    LLMUpdateSettingsFrame,
    StartFrame,
    TranscriptionFrame,
    TTSAudioRawFrame,
    TTSStartedFrame,
    TTSStoppedFrame,
    TTSTextFrame,
    UserImageRawFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_response import (
    LLMAssistantAggregatorParams,
    LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.openai_llm_context import (
    OpenAILLMContext,
    OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import (
    OpenAIAssistantContextAggregator,
    OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt

from .file_api import GeminiFileAPI

try:
    from google.genai import Client
    from google.genai.live import AsyncSession
    from google.genai.types import (
        AudioTranscriptionConfig,
        AutomaticActivityDetection,
        Blob,
        Content,
        ContextWindowCompressionConfig,
        EndSensitivity,
        FileData,
        FunctionResponse,
        GenerationConfig,
        GroundingMetadata,
        HttpOptions,
        LiveConnectConfig,
        LiveServerMessage,
        MediaResolution,
        Modality,
        Part,
        ProactivityConfig,
        RealtimeInputConfig,
        SessionResumptionConfig,
        SlidingWindow,
        SpeechConfig,
        StartSensitivity,
        ThinkingConfig,
        VoiceConfig,
    )
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
    raise Exception(f"Missing module: {e}")


# Connection management constants
MAX_CONSECUTIVE_FAILURES = 3
CONNECTION_ESTABLISHED_THRESHOLD = 10.0  # seconds


def language_to_gemini_language(language: Language) -> Optional[str]:
    """Maps a Language enum value to a Gemini Live supported language code.

    Source:
    https://ai.google.dev/api/generate-content#MediaResolution

    Args:
        language: The language enum value to convert.

    Returns:
        The Gemini language code string, or None if the language is not supported.
    """
    language_map = {
        # Arabic
        Language.AR: "ar-XA",
        # Bengali
        Language.BN_IN: "bn-IN",
        # Chinese (Mandarin)
        Language.CMN: "cmn-CN",
        Language.CMN_CN: "cmn-CN",
        Language.ZH: "cmn-CN",  # Map general Chinese to Mandarin for Gemini
        Language.ZH_CN: "cmn-CN",  # Map Simplified Chinese to Mandarin for Gemini
        # German
        Language.DE: "de-DE",
        Language.DE_DE: "de-DE",
        # English
        Language.EN: "en-US",  # Default to US English (though not explicitly listed in supported codes)
        Language.EN_US: "en-US",
        Language.EN_AU: "en-AU",
        Language.EN_GB: "en-GB",
        Language.EN_IN: "en-IN",
        # Spanish
        Language.ES: "es-ES",  # Default to Spain Spanish
        Language.ES_ES: "es-ES",
        Language.ES_US: "es-US",
        # French
        Language.FR: "fr-FR",  # Default to France French
        Language.FR_FR: "fr-FR",
        Language.FR_CA: "fr-CA",
        # Gujarati
        Language.GU: "gu-IN",
        Language.GU_IN: "gu-IN",
        # Hindi
        Language.HI: "hi-IN",
        Language.HI_IN: "hi-IN",
        # Indonesian
        Language.ID: "id-ID",
        Language.ID_ID: "id-ID",
        # Italian
        Language.IT: "it-IT",
        Language.IT_IT: "it-IT",
        # Japanese
        Language.JA: "ja-JP",
        Language.JA_JP: "ja-JP",
        # Kannada
        Language.KN: "kn-IN",
        Language.KN_IN: "kn-IN",
        # Korean
        Language.KO: "ko-KR",
        Language.KO_KR: "ko-KR",
        # Malayalam
        Language.ML: "ml-IN",
        Language.ML_IN: "ml-IN",
        # Marathi
        Language.MR: "mr-IN",
        Language.MR_IN: "mr-IN",
        # Dutch
        Language.NL: "nl-NL",
        Language.NL_NL: "nl-NL",
        # Polish
        Language.PL: "pl-PL",
        Language.PL_PL: "pl-PL",
        # Portuguese (Brazil)
        Language.PT_BR: "pt-BR",
        # Russian
        Language.RU: "ru-RU",
        Language.RU_RU: "ru-RU",
        # Tamil
        Language.TA: "ta-IN",
        Language.TA_IN: "ta-IN",
        # Telugu
        Language.TE: "te-IN",
        Language.TE_IN: "te-IN",
        # Thai
        Language.TH: "th-TH",
        Language.TH_TH: "th-TH",
        # Turkish
        Language.TR: "tr-TR",
        Language.TR_TR: "tr-TR",
        # Vietnamese
        Language.VI: "vi-VN",
        Language.VI_VN: "vi-VN",
    }
    return language_map.get(language)


class GeminiLiveContext(OpenAILLMContext):
    """Extended OpenAI context for Gemini Live API.

    Provides Gemini-specific context management including system instruction
    extraction and message format conversion for the Live API.
    """

    @staticmethod
    def upgrade(obj: OpenAILLMContext) -> "GeminiLiveContext":
        """Upgrade an OpenAI context to Gemini context.

        Args:
            obj: The OpenAI context to upgrade.

        Returns:
            The upgraded Gemini context instance.
        """
        if isinstance(obj, OpenAILLMContext) and not isinstance(obj, GeminiLiveContext):
            logger.debug(f"Upgrading to Gemini Live Context: {obj}")
            obj.__class__ = GeminiLiveContext
            obj._restructure_from_openai_messages()
        return obj

    def _restructure_from_openai_messages(self):
        pass

    def extract_system_instructions(self):
        """Extract system instructions from context messages.

        Returns:
            Combined system instruction text from all system messages.
        """
        system_instruction = ""
        for item in self.messages:
            if item.get("role") == "system":
                content = item.get("content", "")
                if content:
                    if system_instruction and not system_instruction.endswith("\n"):
                        system_instruction += "\n"
                    system_instruction += str(content)
        return system_instruction

    def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] = None):
        """Add a file reference to the context.

        This adds a user message with a file reference that will be sent during context initialization.

        Args:
            file_uri: URI of the uploaded file
            mime_type: MIME type of the file
            text: Optional text prompt to accompany the file
        """
        # Create parts list with file reference
        parts = []
        if text:
            parts.append({"type": "text", "text": text})

        # Add file reference part
        parts.append(
            {"type": "file_data", "file_data": {"mime_type": mime_type, "file_uri": file_uri}}
        )

        # Add to messages
        message = {"role": "user", "content": parts}
        self.messages.append(message)
        logger.info(f"Added file reference to context: {file_uri}")

    def get_messages_for_initializing_history(self) -> List[Content]:
        """Get messages formatted for Gemini history initialization.

        Returns:
            List of messages in Gemini format for conversation history.
        """
        messages: List[Content] = []
        for item in self.messages:
            role = item.get("role")

            if role == "system":
                continue

            elif role == "assistant":
                role = "model"

            content = item.get("content")
            parts: List[Part] = []
            if isinstance(content, str):
                parts = [Part(text=content)]
            elif isinstance(content, list):
                for part in content:
                    if part.get("type") == "text":
                        parts.append(Part(text=part.get("text")))
                    elif part.get("type") == "file_data":
                        file_data = part.get("file_data", {})
                        parts.append(
                            Part(
                                file_data=FileData(
                                    mime_type=file_data.get("mime_type"),
                                    file_uri=file_data.get("file_uri"),
                                )
                            )
                        )
                    else:
                        logger.warning(f"Unsupported content type: {str(part)[:80]}")
            else:
                logger.warning(f"Unsupported content type: {str(content)[:80]}")
            messages.append(Content(role=role, parts=parts))
        return messages


class GeminiLiveUserContextAggregator(OpenAIUserContextAggregator):
    """User context aggregator for Gemini Live.

    Extends OpenAI user aggregator to handle Gemini-specific message passing
    while maintaining compatibility with the standard aggregation pipeline.
    """

    async def process_frame(self, frame, direction):
        """Process incoming frames for user context aggregation.

        Args:
            frame: The frame to process.
            direction: The frame processing direction.
        """
        await super().process_frame(frame, direction)
        # kind of a hack just to pass the LLMMessagesAppendFrame through, but it's fine for now
        if isinstance(frame, LLMMessagesAppendFrame):
            await self.push_frame(frame, direction)


class GeminiLiveAssistantContextAggregator(OpenAIAssistantContextAggregator):
    """Assistant context aggregator for Gemini Live.

    Handles assistant response aggregation while filtering out LLMTextFrames
    to prevent duplicate context entries, as Gemini Live pushes both
    LLMTextFrames and TTSTextFrames.
    """

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames for assistant context aggregation.

        Args:
            frame: The frame to process.
            direction: The frame processing direction.
        """
        # The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output,
        # but the GeminiLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We
        # need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
        # are process. This ensures that the context gets only one set of messages.
        if not isinstance(frame, LLMTextFrame):
            await super().process_frame(frame, direction)

    async def handle_user_image_frame(self, frame: UserImageRawFrame):
        """Handle user image frames.

        Args:
            frame: The user image frame to handle.
        """
        # We don't want to store any images in the context. Revisit this later
        # when the API evolves.
        pass


@dataclass
class GeminiLiveContextAggregatorPair:
    """Pair of user and assistant context aggregators for Gemini Live.

    Parameters:
        _user: The user context aggregator instance.
        _assistant: The assistant context aggregator instance.
    """

    _user: GeminiLiveUserContextAggregator
    _assistant: GeminiLiveAssistantContextAggregator

    def user(self) -> GeminiLiveUserContextAggregator:
        """Get the user context aggregator.

        Returns:
            The user context aggregator instance.
        """
        return self._user

    def assistant(self) -> GeminiLiveAssistantContextAggregator:
        """Get the assistant context aggregator.

        Returns:
            The assistant context aggregator instance.
        """
        return self._assistant


class GeminiModalities(Enum):
    """Supported modalities for Gemini Live.

    Parameters:
        TEXT: Text responses.
        AUDIO: Audio responses.
    """

    TEXT = "TEXT"
    AUDIO = "AUDIO"


class GeminiMediaResolution(str, Enum):
    """Media resolution options for Gemini Live.

    Parameters:
        UNSPECIFIED: Use default resolution setting.
        LOW: Low resolution with 64 tokens.
        MEDIUM: Medium resolution with 256 tokens.
        HIGH: High resolution with zoomed reframing and 256 tokens.
    """

    UNSPECIFIED = "MEDIA_RESOLUTION_UNSPECIFIED"  # Use default
    LOW = "MEDIA_RESOLUTION_LOW"  # 64 tokens
    MEDIUM = "MEDIA_RESOLUTION_MEDIUM"  # 256 tokens
    HIGH = "MEDIA_RESOLUTION_HIGH"  # Zoomed reframing with 256 tokens


class GeminiVADParams(BaseModel):
    """Voice Activity Detection parameters for Gemini Live.

    Parameters:
        disabled: Whether to disable VAD. Defaults to None.
        start_sensitivity: Sensitivity for speech start detection. Defaults to None.
        end_sensitivity: Sensitivity for speech end detection. Defaults to None.
        prefix_padding_ms: Prefix padding in milliseconds. Defaults to None.
        silence_duration_ms: Silence duration threshold in milliseconds. Defaults to None.
    """

    disabled: Optional[bool] = Field(default=None)
    start_sensitivity: Optional[StartSensitivity] = Field(default=None)
    end_sensitivity: Optional[EndSensitivity] = Field(default=None)
    prefix_padding_ms: Optional[int] = Field(default=None)
    silence_duration_ms: Optional[int] = Field(default=None)


class ContextWindowCompressionParams(BaseModel):
    """Parameters for context window compression in Gemini Live.

    Parameters:
        enabled: Whether compression is enabled. Defaults to False.
        trigger_tokens: Token count to trigger compression. None uses 80% of context window.
    """

    enabled: bool = Field(default=False)
    trigger_tokens: Optional[int] = Field(
        default=None
    )  # None = use default (80% of context window)


class InputParams(BaseModel):
    """Input parameters for Gemini Live generation.

    Parameters:
        frequency_penalty: Frequency penalty for generation (0.0-2.0). Defaults to None.
        max_tokens: Maximum tokens to generate. Must be >= 1. Defaults to 4096.
        presence_penalty: Presence penalty for generation (0.0-2.0). Defaults to None.
        temperature: Sampling temperature (0.0-2.0). Defaults to None.
        top_k: Top-k sampling parameter. Must be >= 0. Defaults to None.
        top_p: Top-p sampling parameter (0.0-1.0). Defaults to None.
        modalities: Response modalities. Defaults to AUDIO.
        language: Language for generation. Defaults to EN_US.
        media_resolution: Media resolution setting. Defaults to UNSPECIFIED.
        vad: Voice activity detection parameters. Defaults to None.
        context_window_compression: Context compression settings. Defaults to None.
        thinking: Thinking settings. Defaults to None.
            Note that these settings may require specifying a model that
            supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025".
        enable_affective_dialog: Enable affective dialog, which allows Gemini
            to adapt to expression and tone. Defaults to None.
            Note that these settings may require specifying a model that
            supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025".
            Also note that this setting may require specifying an API version that
            supports it, e.g. HttpOptions(api_version="v1alpha").
        proactivity: Proactivity settings, which allows Gemini to proactively
            decide how to behave, such as whether to avoid responding to
            content that is not relevant. Defaults to None.
            Note that these settings may require specifying a model that
            supports them, e.g. "gemini-2.5-flash-native-audio-preview-09-2025".
            Also note that this setting may require specifying an API version that
            supports it, e.g. HttpOptions(api_version="v1alpha").
        extra: Additional parameters. Defaults to empty dict.
    """

    frequency_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0)
    max_tokens: Optional[int] = Field(default=4096, ge=1)
    presence_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0)
    temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0)
    top_k: Optional[int] = Field(default=None, ge=0)
    top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
    modalities: Optional[GeminiModalities] = Field(default=GeminiModalities.AUDIO)
    language: Optional[Language] = Field(default=Language.EN_US)
    media_resolution: Optional[GeminiMediaResolution] = Field(
        default=GeminiMediaResolution.UNSPECIFIED
    )
    vad: Optional[GeminiVADParams] = Field(default=None)
    context_window_compression: Optional[ContextWindowCompressionParams] = Field(default=None)
    thinking: Optional[ThinkingConfig] = Field(default=None)
    enable_affective_dialog: Optional[bool] = Field(default=None)
    proactivity: Optional[ProactivityConfig] = Field(default=None)
    extra: Optional[Dict[str, Any]] = Field(default_factory=dict)


class GeminiLiveLLMService(LLMService):
    """Provides access to Google's Gemini Live API.

    This service enables real-time conversations with Gemini, supporting both
    text and audio modalities. It handles voice transcription, streaming audio
    responses, and tool usage.
    """

    # Overriding the default adapter to use the Gemini one.
    adapter_class = GeminiLLMAdapter

    def __init__(
        self,
        *,
        api_key: str,
        base_url: Optional[str] = None,
        model="models/gemini-2.0-flash-live-001",
        voice_id: str = "Charon",
        start_audio_paused: bool = False,
        start_video_paused: bool = False,
        system_instruction: Optional[str] = None,
        tools: Optional[Union[List[dict], ToolsSchema]] = None,
        params: Optional[InputParams] = None,
        inference_on_context_initialization: bool = True,
        file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files",
        http_options: Optional[HttpOptions] = None,
        **kwargs,
    ):
        """Initialize the Gemini Live LLM service.

        Args:
            api_key: Google AI API key for authentication.
            base_url: API endpoint base URL. Defaults to the official Gemini Live endpoint.

                .. deprecated:: 0.0.90
                    This parameter is deprecated and no longer has any effect.
                    Please use `http_options` to customize requests made by the
                    API client.

            model: Model identifier to use. Defaults to "models/gemini-2.0-flash-live-001".
            voice_id: TTS voice identifier. Defaults to "Charon".
            start_audio_paused: Whether to start with audio input paused. Defaults to False.
            start_video_paused: Whether to start with video input paused. Defaults to False.
            system_instruction: System prompt for the model. Defaults to None.
            tools: Tools/functions available to the model. Defaults to None.
            params: Configuration parameters for the model. Defaults to InputParams().
            inference_on_context_initialization: Whether to generate a response when context
                is first set. Defaults to True.
            file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint.
            http_options: HTTP options for the client.
            **kwargs: Additional arguments passed to parent LLMService.
        """
        # Check for deprecated parameter usage
        if base_url is not None:
            import warnings

            with warnings.catch_warnings():
                warnings.simplefilter("always")
                warnings.warn(
                    "Parameter 'base_url' is deprecated and no longer has any effect. Please use 'http_options' to customize requests made by the API client.",
                    DeprecationWarning,
                    stacklevel=2,
                )

        super().__init__(base_url=base_url, **kwargs)

        params = params or InputParams()

        self._last_sent_time = 0
        self._base_url = base_url
        self.set_model_name(model)
        self._voice_id = voice_id
        self._language_code = params.language

        self._system_instruction = system_instruction
        self._tools = tools
        self._inference_on_context_initialization = inference_on_context_initialization
        self._needs_turn_complete_message = False

        self._audio_input_paused = start_audio_paused
        self._video_input_paused = start_video_paused
        self._context = None
        self._api_key = api_key
        self._http_options = http_options
        self._session: AsyncSession = None
        self._connection_task = None

        self._disconnecting = False
        self._run_llm_when_session_ready = False

        self._user_is_speaking = False
        self._bot_is_speaking = False
        self._user_audio_buffer = bytearray()
        self._user_transcription_buffer = ""
        self._last_transcription_sent = ""
        self._bot_audio_buffer = bytearray()
        self._bot_text_buffer = ""
        self._llm_output_buffer = ""

        self._sample_rate = 24000

        self._language = params.language
        self._language_code = (
            language_to_gemini_language(params.language) if params.language else "en-US"
        )
        self._vad_params = params.vad

        # Reconnection tracking
        self._consecutive_failures = 0
        self._connection_start_time = None

        self._settings = {
            "frequency_penalty": params.frequency_penalty,
            "max_tokens": params.max_tokens,
            "presence_penalty": params.presence_penalty,
            "temperature": params.temperature,
            "top_k": params.top_k,
            "top_p": params.top_p,
            "modalities": params.modalities,
            "language": self._language_code,
            "media_resolution": params.media_resolution,
            "vad": params.vad,
            "context_window_compression": params.context_window_compression.model_dump()
            if params.context_window_compression
            else {},
            "thinking": params.thinking or {},
            "enable_affective_dialog": params.enable_affective_dialog or False,
            "proactivity": params.proactivity or {},
            "extra": params.extra if isinstance(params.extra, dict) else {},
        }

        self._file_api_base_url = file_api_base_url
        self._file_api: Optional[GeminiFileAPI] = None

        # Grounding metadata tracking
        self._search_result_buffer = ""
        self._accumulated_grounding_metadata = None

        # Session resumption
        self._session_resumption_handle: Optional[str] = None

        # Bookkeeping for ending gracefully (i.e. after the bot is finished)
        self._end_frame_pending_bot_turn_finished: Optional[EndFrame] = None

        # Initialize the API client. Subclasses can override this if needed.
        self.create_client()

    def create_client(self):
        """Create the Gemini API client instance. Subclasses can override this."""
        self._client = Client(api_key=self._api_key, http_options=self._http_options)

    @property
    def file_api(self) -> GeminiFileAPI:
        """Get the Gemini File API client instance. Subclasses can override this.

        Returns:
            The Gemini File API client.
        """
        if not self._file_api:
            self._file_api = GeminiFileAPI(api_key=self._api_key, base_url=self._file_api_base_url)
        return self._file_api

    def can_generate_metrics(self) -> bool:
        """Check if the service can generate usage metrics.

        Returns:
            True as Gemini Live supports token usage metrics.
        """
        return True

    def needs_mcp_alternate_schema(self) -> bool:
        """Check if this LLM service requires alternate MCP schema.

        Google/Gemini has stricter JSON schema validation and requires
        certain properties to be removed or modified for compatibility.

        Returns:
            True for Google/Gemini services.
        """
        return True

    def set_audio_input_paused(self, paused: bool):
        """Set the audio input pause state.

        Args:
            paused: Whether to pause audio input.
        """
        self._audio_input_paused = paused

    def set_video_input_paused(self, paused: bool):
        """Set the video input pause state.

        Args:
            paused: Whether to pause video input.
        """
        self._video_input_paused = paused

    def set_model_modalities(self, modalities: GeminiModalities):
        """Set the model response modalities.

        Args:
            modalities: The modalities to use for responses.
        """
        self._settings["modalities"] = modalities

    def set_language(self, language: Language):
        """Set the language for generation.

        Args:
            language: The language to use for generation.
        """
        self._language = language
        self._language_code = language_to_gemini_language(language) or "en-US"
        self._settings["language"] = self._language_code
        logger.info(f"Set Gemini language to: {self._language_code}")

    async def set_context(self, context: OpenAILLMContext):
        """Set the context explicitly from outside the pipeline.

        This is useful when initializing a conversation because in server-side VAD mode we might not have a
        way to trigger the pipeline. This sends the history to the server. The `inference_on_context_initialization`
        flag controls whether to set the turnComplete flag when we do this. Without that flag, the model will
        not respond. This is often what we want when setting the context at the beginning of a conversation.

        Args:
            context: The OpenAI LLM context to set.
        """
        if self._context:
            logger.error("Context already set. Can only set up Gemini Live context once.")
            return
        self._context = GeminiLiveContext.upgrade(context)
        await self._create_initial_response()

    #
    # standard AIService frame handling
    #

    async def start(self, frame: StartFrame):
        """Start the service and establish connection.

        Args:
            frame: The start frame.
        """
        await super().start(frame)
        await self._connect()

    async def stop(self, frame: EndFrame):
        """Stop the service and close connections.

        Args:
            frame: The end frame.
        """
        await super().stop(frame)
        await self._disconnect()

    async def cancel(self, frame: CancelFrame):
        """Cancel the service and close connections.

        Args:
            frame: The cancel frame.
        """
        await super().cancel(frame)
        await self._disconnect()

    #
    # speech and interruption handling
    #

    async def _handle_interruption(self):
        await self._set_bot_is_speaking(False)
        await self.push_frame(TTSStoppedFrame())
        await self.push_frame(LLMFullResponseEndFrame())

    async def _handle_user_started_speaking(self, frame):
        self._user_is_speaking = True
        pass

    async def _handle_user_stopped_speaking(self, frame):
        self._user_is_speaking = False
        self._user_audio_buffer = bytearray()
        await self.start_ttfb_metrics()
        if self._needs_turn_complete_message:
            self._needs_turn_complete_message = False
            # NOTE: without this, the model ignores the context it's been
            # seeded with before the user started speaking
            await self._session.send_client_content(turn_complete=True)

    #
    # frame processing
    #
    # StartFrame, StopFrame, CancelFrame implemented in base class
    #

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames for the Gemini Live service.

        Args:
            frame: The frame to process.
            direction: The frame processing direction.
        """
        # Defer EndFrame handling until after the bot turn is finished
        if isinstance(frame, EndFrame):
            if self._bot_is_speaking:
                logger.debug("Deferring handling EndFrame until bot turn is finished")
                self._end_frame_pending_bot_turn_finished = frame
                return

        await super().process_frame(frame, direction)

        if isinstance(frame, TranscriptionFrame):
            await self.push_frame(frame, direction)
        elif isinstance(frame, OpenAILLMContextFrame):
            context: GeminiLiveContext = GeminiLiveContext.upgrade(frame.context)
            # For now, we'll only trigger inference here when either:
            #   1. We have not seen a context frame before
            #   2. The last message is a tool call result
            if not self._context:
                self._context = context
                if frame.context.tools:
                    self._tools = frame.context.tools
                await self._create_initial_response()
            elif context.messages and context.messages[-1].get("role") == "tool":
                # Support just one tool call per context frame for now
                tool_result_message = context.messages[-1]
                await self._tool_result(tool_result_message)
        elif isinstance(frame, LLMContextFrame):
            raise NotImplementedError("Universal LLMContext is not yet supported for Gemini Live.")
        elif isinstance(frame, InputTextRawFrame):
            await self._send_user_text(frame.text)
            await self.push_frame(frame, direction)
        elif isinstance(frame, InputAudioRawFrame):
            await self._send_user_audio(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, InputImageRawFrame):
            await self._send_user_video(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, InterruptionFrame):
            await self._handle_interruption()
            await self.push_frame(frame, direction)
        elif isinstance(frame, UserStartedSpeakingFrame):
            await self._handle_user_started_speaking(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, UserStoppedSpeakingFrame):
            await self._handle_user_stopped_speaking(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, BotStartedSpeakingFrame):
            # Ignore this frame. Use the serverContent API message instead
            await self.push_frame(frame, direction)
        elif isinstance(frame, BotStoppedSpeakingFrame):
            # ignore this frame. Use the serverContent.turnComplete API message
            await self.push_frame(frame, direction)
        elif isinstance(frame, LLMMessagesAppendFrame):
            # NOTE: handling LLMMessagesAppendFrame here in the LLMService is
            # unusual - typically this would be handled in the user context
            # aggregator. Leaving this handling here so that user code that
            # uses this frame *without* a user context aggregator still works
            # (we have an example that does just that, actually).
            await self._create_single_response(frame.messages)
        elif isinstance(frame, LLMUpdateSettingsFrame):
            await self._update_settings(frame.settings)
        elif isinstance(frame, LLMSetToolsFrame):
            await self._update_settings()
        else:
            await self.push_frame(frame, direction)

    async def _set_bot_is_speaking(self, speaking: bool):
        if self._bot_is_speaking == speaking:
            return

        self._bot_is_speaking = speaking

        if not self._bot_is_speaking and self._end_frame_pending_bot_turn_finished:
            await self.queue_frame(self._end_frame_pending_bot_turn_finished)
            self._end_frame_pending_bot_turn_finished = None

    async def _connect(self, session_resumption_handle: Optional[str] = None):
        """Establish client connection to Gemini Live API."""
        if self._session:
            # Here we assume that if we have a client, we are connected. We
            # handle disconnections in the send/recv code paths.
            return

        if session_resumption_handle:
            logger.info(
                f"Connecting to Gemini service with session_resumption_handle: {session_resumption_handle}"
            )
        else:
            logger.info("Connecting to Gemini service")
        try:
            # Assemble basic configuration
            config = LiveConnectConfig(
                generation_config=GenerationConfig(
                    frequency_penalty=self._settings["frequency_penalty"],
                    max_output_tokens=self._settings["max_tokens"],
                    presence_penalty=self._settings["presence_penalty"],
                    temperature=self._settings["temperature"],
                    top_k=self._settings["top_k"],
                    top_p=self._settings["top_p"],
                    response_modalities=[Modality(self._settings["modalities"].value)],
                    speech_config=SpeechConfig(
                        voice_config=VoiceConfig(
                            prebuilt_voice_config={"voice_name": self._voice_id}
                        ),
                        language_code=self._settings["language"],
                    ),
                    media_resolution=MediaResolution(self._settings["media_resolution"].value),
                ),
                input_audio_transcription=AudioTranscriptionConfig(),
                output_audio_transcription=AudioTranscriptionConfig(),
                session_resumption=SessionResumptionConfig(handle=session_resumption_handle),
            )

            # Add context window compression to configuration, if enabled
            if self._settings.get("context_window_compression", {}).get("enabled", False):
                compression_config = ContextWindowCompressionConfig()

                # Add sliding window (always true if compression is enabled)
                compression_config.sliding_window = SlidingWindow()

                # Add trigger_tokens if specified
                trigger_tokens = self._settings.get("context_window_compression", {}).get(
                    "trigger_tokens"
                )
                if trigger_tokens is not None:
                    compression_config.trigger_tokens = trigger_tokens

                config.context_window_compression = compression_config

            # Add thinking configuration to configuration, if provided
            if self._settings.get("thinking"):
                config.thinking_config = self._settings["thinking"]

            # Add affective dialog setting, if provided
            if self._settings.get("enable_affective_dialog", False):
                config.enable_affective_dialog = self._settings["enable_affective_dialog"]

            # Add proactivity configuration to configuration, if provided
            if self._settings.get("proactivity"):
                config.proactivity = self._settings["proactivity"]

            # Add VAD configuration to configuration, if provided
            if self._settings.get("vad"):
                vad_config = AutomaticActivityDetection()
                vad_params = self._settings["vad"]
                has_vad_settings = False

                # Only add parameters that are explicitly set
                if vad_params.disabled is not None:
                    vad_config.disabled = vad_params.disabled
                    has_vad_settings = True

                if vad_params.start_sensitivity:
                    vad_config.start_of_speech_sensitivity = vad_params.start_sensitivity
                    has_vad_settings = True

                if vad_params.end_sensitivity:
                    vad_config.end_of_speech_sensitivity = vad_params.end_sensitivity
                    has_vad_settings = True

                if vad_params.prefix_padding_ms is not None:
                    vad_config.prefix_padding_ms = vad_params.prefix_padding_ms
                    has_vad_settings = True

                if vad_params.silence_duration_ms is not None:
                    vad_config.silence_duration_ms = vad_params.silence_duration_ms
                    has_vad_settings = True

                # Only add automatic_activity_detection if we have VAD settings
                if has_vad_settings:
                    config.realtime_input_config = RealtimeInputConfig(
                        automatic_activity_detection=vad_config
                    )

            # Add system instruction to configuration, if provided
            system_instruction = self._system_instruction or ""
            if self._context and hasattr(self._context, "extract_system_instructions"):
                system_instruction += "\n" + self._context.extract_system_instructions()
            if system_instruction:
                logger.debug(f"Setting system instruction: {system_instruction}")
                config.system_instruction = system_instruction

            # Add tools to configuration, if provided
            if self._tools:
                logger.debug(f"Setting tools: {self._tools}")
                config.tools = self.get_llm_adapter().from_standard_tools(self._tools)

            # Start the connection
            self._connection_task = self.create_task(self._connection_task_handler(config=config))

        except Exception as e:
            await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True))

    async def _connection_task_handler(self, config: LiveConnectConfig):
        async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
            logger.info("Connected to Gemini service")

            # Mark connection start time
            self._connection_start_time = time.time()

            await self._handle_session_ready(session)

            while True:
                try:
                    turn = self._session.receive()
                    async for message in turn:
                        # Reset failure counter if connection has been stable
                        self._check_and_reset_failure_counter()

                        if message.server_content and message.server_content.model_turn:
                            await self._handle_msg_model_turn(message)
                        elif (
                            message.server_content
                            and message.server_content.turn_complete
                            and message.usage_metadata
                        ):
                            await self._handle_msg_turn_complete(message)
                            await self._handle_msg_usage_metadata(message)
                        elif message.server_content and message.server_content.input_transcription:
                            await self._handle_msg_input_transcription(message)
                        elif message.server_content and message.server_content.output_transcription:
                            await self._handle_msg_output_transcription(message)
                        elif message.server_content and message.server_content.grounding_metadata:
                            await self._handle_msg_grounding_metadata(message)
                        elif message.tool_call:
                            await self._handle_msg_tool_call(message)
                        elif message.session_resumption_update:
                            self._handle_msg_resumption_update(message)
                except Exception as e:
                    if not self._disconnecting:
                        should_reconnect = await self._handle_connection_error(e)
                        if should_reconnect:
                            await self._reconnect()
                            return  # Exit this connection handler, _reconnect will start a new one
                    break

    def _check_and_reset_failure_counter(self):
        """Check if connection has been stable long enough to reset the failure counter.

        If the connection has been active for longer than the established threshold
        and there are accumulated failures, reset the counter to 0.
        """
        if (
            self._connection_start_time
            and self._consecutive_failures > 0
            and time.time() - self._connection_start_time >= CONNECTION_ESTABLISHED_THRESHOLD
        ):
            logger.info(
                f"Connection stable for {CONNECTION_ESTABLISHED_THRESHOLD}s, "
                f"resetting failure counter from {self._consecutive_failures} to 0"
            )
            self._consecutive_failures = 0

    async def _handle_connection_error(self, error: Exception) -> bool:
        """Handle a connection error and determine if reconnection should be attempted.

        Args:
            error: The exception that caused the connection error.

        Returns:
            True if reconnection should be attempted, False if a fatal error should be pushed.
        """
        self._consecutive_failures += 1
        logger.warning(
            f"Connection error (failure {self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {error}"
        )

        if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
            logger.error(
                f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
                "treating as fatal error"
            )
            await self.push_error(
                ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True)
            )
            return False
        else:
            logger.info(
                f"Attempting reconnection ({self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES})"
            )
            return True

    async def _reconnect(self):
        """Reconnect to Gemini Live API."""
        await self._disconnect()
        await self._connect(session_resumption_handle=self._session_resumption_handle)

    async def _disconnect(self):
        """Disconnect from Gemini Live API and clean up resources."""
        logger.info("Disconnecting from Gemini service")
        try:
            self._disconnecting = True
            await self.stop_all_metrics()
            if self._connection_task:
                await self.cancel_task(self._connection_task, timeout=1.0)
                self._connection_task = None
            if self._session:
                await self._session.close()
                self._session = None
            self._disconnecting = False
        except Exception as e:
            logger.error(f"{self} error disconnecting: {e}")

    async def _send_user_audio(self, frame):
        """Send user audio frame to Gemini Live API."""
        if self._audio_input_paused or self._disconnecting or not self._session:
            return

        # Send all audio to Gemini
        try:
            await self._session.send_realtime_input(
                audio=Blob(data=frame.audio, mime_type=f"audio/pcm;rate={frame.sample_rate}")
            )
        except Exception as e:
            await self._handle_send_error(e)

        # Manage a buffer of audio to use for transcription
        audio = frame.audio
        if self._user_is_speaking:
            self._user_audio_buffer.extend(audio)
        else:
            # Keep 1/2 second of audio in the buffer even when not speaking.
            self._user_audio_buffer.extend(audio)
            length = int((frame.sample_rate * frame.num_channels * 2) * 0.5)
            self._user_audio_buffer = self._user_audio_buffer[-length:]

    async def _send_user_text(self, text: str):
        """Send user text via Gemini Live API's realtime input stream.

        This method sends text through the realtimeInput stream (via TextInputMessage)
        rather than the clientContent stream. This ensures text input is synchronized
        with audio and video inputs, preventing temporal misalignment that can occur
        when different modalities are processed through separate API pathways.

        For realtimeInput, turn completion is automatically inferred by the API based
        on user activity, so no explicit turnComplete signal is needed.

        Args:
            text: The text to send as user input.
        """
        if self._disconnecting or not self._session:
            return

        try:
            await self._session.send_realtime_input(text=text)
        except Exception as e:
            await self._handle_send_error(e)

    async def _send_user_video(self, frame):
        """Send user video frame to Gemini Live API."""
        if self._video_input_paused or self._disconnecting or not self._session:
            return

        now = time.time()
        if now - self._last_sent_time < 1:
            return  # Ignore if less than 1 second has passed

        self._last_sent_time = now  # Update last sent time
        logger.debug(f"Sending video frame to Gemini: {frame}")

        buffer = io.BytesIO()
        Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG")
        data = base64.b64encode(buffer.getvalue()).decode("utf-8")

        try:
            await self._session.send_realtime_input(video=Blob(data=data, mime_type="image/jpeg"))
        except Exception as e:
            await self._handle_send_error(e)

    async def _create_initial_response(self):
        """Create initial response based on context history."""
        if self._disconnecting:
            return

        if not self._session:
            self._run_llm_when_session_ready = True
            return

        messages = self._context.get_messages_for_initializing_history()
        if not messages:
            return

        logger.debug(f"Creating initial response: {messages}")

        await self.start_ttfb_metrics()

        try:
            await self._session.send_client_content(
                turns=messages, turn_complete=self._inference_on_context_initialization
            )
        except Exception as e:
            await self._handle_send_error(e)

        # If we're generating a response right away upon initializing
        # conversation history, set a flag saying that we need a turn complete
        # message when the user stops speaking.
        if not self._inference_on_context_initialization:
            self._needs_turn_complete_message = True

    async def _create_single_response(self, messages_list):
        """Create a single response from a list of messages."""
        if self._disconnecting or not self._session:
            return

        # Create a throwaway context just for the purpose of getting messages
        # in the right format
        context = GeminiLiveContext.upgrade(OpenAILLMContext(messages=messages_list))
        messages = context.get_messages_for_initializing_history()

        if not messages:
            return

        logger.debug(f"Creating response: {messages}")

        await self.start_ttfb_metrics()

        try:
            await self._session.send_client_content(turns=messages, turn_complete=True)
        except Exception as e:
            await self._handle_send_error(e)

    @traced_gemini_live(operation="llm_tool_result")
    async def _tool_result(self, tool_result_message):
        """Send tool result back to the API."""
        if self._disconnecting or not self._session:
            return

        # For now we're shoving the name into the tool_call_id field, so this
        # will work until we revisit that.
        id = tool_result_message.get("tool_call_id")
        name = tool_result_message.get("tool_call_name")
        result = json.loads(tool_result_message.get("content") or "")
        response = FunctionResponse(name=name, id=id, response=result)

        try:
            await self._session.send_tool_response(function_responses=response)
        except Exception as e:
            await self._handle_send_error(e)

    @traced_gemini_live(operation="llm_setup")
    async def _handle_session_ready(self, session: AsyncSession):
        """Handle the session being ready."""
        self._session = session
        # If we were just waititng for the session to be ready to run the LLM,
        # do that now.
        if self._run_llm_when_session_ready:
            self._run_llm_when_session_ready = False
            await self._create_initial_response()

    async def _handle_msg_model_turn(self, msg: LiveServerMessage):
        """Handle the model turn message."""
        part = msg.server_content.model_turn.parts[0]
        if not part:
            return

        await self.stop_ttfb_metrics()

        # part.text is added when `modalities` is set to TEXT; otherwise, it's None
        text = part.text
        if text:
            if not self._bot_text_buffer:
                await self.push_frame(LLMFullResponseStartFrame())

            self._bot_text_buffer += text
            self._search_result_buffer += text  # Also accumulate for grounding
            await self.push_frame(LLMTextFrame(text=text))

        # Check for grounding metadata in server content
        if msg.server_content and msg.server_content.grounding_metadata:
            self._accumulated_grounding_metadata = msg.server_content.grounding_metadata

        inline_data = part.inline_data
        if not inline_data:
            return

        # Check if mime type matches expected format
        expected_mime_type = f"audio/pcm;rate={self._sample_rate}"
        if inline_data.mime_type == expected_mime_type:
            # Perfect match, continue processing
            pass
        elif inline_data.mime_type == "audio/pcm":
            # Sample rate not provided in mime type, assume default
            if not hasattr(self, "_sample_rate_warning_logged"):
                logger.warning(
                    f"Sample rate not provided in mime type '{inline_data.mime_type}', assuming rate of {self._sample_rate}"
                )
                self._sample_rate_warning_logged = True
        else:
            # Unrecognized format
            logger.warning(f"Unrecognized server_content format {inline_data.mime_type}")
            return

        audio = inline_data.data
        if not audio:
            return

        if not self._bot_is_speaking:
            await self._set_bot_is_speaking(True)
            await self.push_frame(TTSStartedFrame())
            await self.push_frame(LLMFullResponseStartFrame())

        self._bot_audio_buffer.extend(audio)
        frame = TTSAudioRawFrame(
            audio=audio,
            sample_rate=self._sample_rate,
            num_channels=1,
        )
        await self.push_frame(frame)

    @traced_gemini_live(operation="llm_tool_call")
    async def _handle_msg_tool_call(self, message: LiveServerMessage):
        """Handle tool call messages."""
        function_calls = message.tool_call.function_calls
        if not function_calls:
            return
        if not self._context:
            logger.error("Function calls are not supported without a context object.")

        function_calls_llm = [
            FunctionCallFromLLM(
                context=self._context,
                tool_call_id=(
                    # NOTE: when using Vertex AI we don't get server-provided
                    # tool call IDs here
                    f.id or str(uuid.uuid4())
                ),
                function_name=f.name,
                arguments=f.args,
            )
            for f in function_calls
        ]

        await self.run_function_calls(function_calls_llm)

    @traced_gemini_live(operation="llm_response")
    async def _handle_msg_turn_complete(self, message: LiveServerMessage):
        """Handle the turn complete message."""
        await self._set_bot_is_speaking(False)
        text = self._bot_text_buffer

        # Trace the complete LLM response (this will be handled by the decorator)
        # The decorator will extract the output text and usage metadata from the message

        self._bot_text_buffer = ""
        self._llm_output_buffer = ""

        # Process grounding metadata if we have accumulated any
        if self._accumulated_grounding_metadata:
            await self._process_grounding_metadata(
                self._accumulated_grounding_metadata, self._search_result_buffer
            )

        # Reset grounding tracking for next response
        self._search_result_buffer = ""
        self._accumulated_grounding_metadata = None

        # Only push the TTSStoppedFrame if the bot is outputting audio
        # when text is found, modalities is set to TEXT and no audio
        # is produced.
        if not text:
            await self.push_frame(TTSStoppedFrame())

        await self.push_frame(LLMFullResponseEndFrame())

    @traced_stt
    async def _handle_user_transcription(
        self, transcript: str, is_final: bool, language: Optional[Language] = None
    ):
        """Handle a transcription result with tracing."""
        pass

    async def _handle_msg_input_transcription(self, message: LiveServerMessage):
        """Handle the input transcription message.

        Gemini Live sends user transcriptions in either single words or multi-word
        phrases. As a result, we have to aggregate the input transcription. This handler
        aggregates into sentences, splitting on the end of sentence markers.
        """
        if not message.server_content.input_transcription:
            return

        text = message.server_content.input_transcription.text

        if not text:
            return

        # Strip leading space from sentence starts if buffer is empty
        if text.startswith(" ") and not self._user_transcription_buffer:
            text = text.lstrip()

        # Accumulate text in the buffer
        self._user_transcription_buffer += text

        # Check for complete sentences
        while True:
            eos_end_marker = match_endofsentence(self._user_transcription_buffer)
            if not eos_end_marker:
                break

            # Extract the complete sentence
            complete_sentence = self._user_transcription_buffer[:eos_end_marker]
            # Keep the remainder for the next chunk
            self._user_transcription_buffer = self._user_transcription_buffer[eos_end_marker:]

            # Send a TranscriptionFrame with the complete sentence
            logger.debug(f"[Transcription:user] [{complete_sentence}]")
            await self._handle_user_transcription(
                complete_sentence, True, self._settings["language"]
            )
            await self.push_frame(
                TranscriptionFrame(
                    text=complete_sentence,
                    user_id="",
                    timestamp=time_now_iso8601(),
                    result=message,
                ),
                FrameDirection.UPSTREAM,
            )

    async def _handle_msg_output_transcription(self, message: LiveServerMessage):
        """Handle the output transcription message."""
        if not message.server_content.output_transcription:
            return

        # This is the output transcription text when modalities is set to AUDIO.
        # In this case, we push LLMTextFrame and TTSTextFrame to be handled by the
        # downstream assistant context aggregator.
        text = message.server_content.output_transcription.text

        if not text:
            return

        # Accumulate text for grounding as well
        self._search_result_buffer += text

        # Check for grounding metadata in server content
        if message.server_content and message.server_content.grounding_metadata:
            self._accumulated_grounding_metadata = message.server_content.grounding_metadata
        # Collect text for tracing
        self._llm_output_buffer += text

        await self.push_frame(LLMTextFrame(text=text))
        await self.push_frame(TTSTextFrame(text=text))

    async def _handle_msg_grounding_metadata(self, message: LiveServerMessage):
        """Handle dedicated grounding metadata messages."""
        if message.server_content and message.server_content.grounding_metadata:
            grounding_metadata = message.server_content.grounding_metadata
            # Process the grounding metadata immediately
            await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer)

    async def _process_grounding_metadata(
        self, grounding_metadata: GroundingMetadata, search_result: str = ""
    ):
        """Process grounding metadata and emit LLMSearchResponseFrame."""
        if not grounding_metadata:
            return

        # Extract rendered content for search suggestions
        rendered_content = None
        if (
            grounding_metadata.search_entry_point
            and grounding_metadata.search_entry_point.rendered_content
        ):
            rendered_content = grounding_metadata.search_entry_point.rendered_content

        # Convert grounding chunks and supports to LLMSearchOrigin format
        origins = []

        if grounding_metadata.grounding_chunks and grounding_metadata.grounding_supports:
            # Create a mapping of chunk indices to origins
            chunk_to_origin: Dict[int, LLMSearchOrigin] = {}

            for index, chunk in enumerate(grounding_metadata.grounding_chunks):
                if chunk.web:
                    origin = LLMSearchOrigin(
                        site_uri=chunk.web.uri, site_title=chunk.web.title, results=[]
                    )
                    chunk_to_origin[index] = origin
                    origins.append(origin)

            # Add grounding support results to the appropriate origins
            for support in grounding_metadata.grounding_supports:
                if support.segment and support.grounding_chunk_indices:
                    text = support.segment.text or ""
                    confidence_scores = support.confidence_scores or []

                    # Add this result to all origins referenced by this support
                    for chunk_index in support.grounding_chunk_indices:
                        if chunk_index in chunk_to_origin:
                            result = LLMSearchResult(text=text, confidence=confidence_scores)
                            chunk_to_origin[chunk_index].results.append(result)

        # Create and push the search response frame
        search_frame = LLMSearchResponseFrame(
            search_result=search_result, origins=origins, rendered_content=rendered_content
        )

        await self.push_frame(search_frame)

    async def _handle_msg_usage_metadata(self, message: LiveServerMessage):
        """Handle the usage metadata message."""
        if not message.usage_metadata:
            return

        usage = message.usage_metadata

        # Ensure we have valid integers for all token counts
        prompt_tokens = usage.prompt_token_count or 0
        completion_tokens = usage.response_token_count or 0
        total_tokens = usage.total_token_count or (prompt_tokens + completion_tokens)

        tokens = LLMTokenUsage(
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens,
        )

        await self.start_llm_usage_metrics(tokens)

    def _handle_msg_resumption_update(self, message: LiveServerMessage):
        update = message.session_resumption_update
        if update.resumable and update.new_handle:
            self._session_resumption_handle = update.new_handle

    async def _handle_send_error(self, error: Exception):
        # In server-to-server contexts, a WebSocket error should be quite rare.
        # Given how hard it is to recover from a send-side error with proper
        # state management, and that exponential backoff for retries can have
        # cost/stability implications for a service cluster, let's just treat a
        # send-side error as fatal.
        if not self._disconnecting:
            await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True))

    def create_context_aggregator(
        self,
        context: OpenAILLMContext,
        *,
        user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
        assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
    ) -> GeminiLiveContextAggregatorPair:
        """Create an instance of GeminiLiveContextAggregatorPair from an OpenAILLMContext.

        Constructor keyword arguments for both the user and assistant aggregators can be provided.

        Args:
            context: The LLM context to use.
            user_params: User aggregator parameters. Defaults to LLMUserAggregatorParams().
            assistant_params: Assistant aggregator parameters. Defaults to LLMAssistantAggregatorParams().

        Returns:
            GeminiLiveContextAggregatorPair: A pair of context
            aggregators, one for the user and one for the assistant,
            encapsulated in an GeminiLiveContextAggregatorPair.
        """
        context.set_llm_adapter(self.get_llm_adapter())

        GeminiLiveContext.upgrade(context)
        user = GeminiLiveUserContextAggregator(context, params=user_params)

        assistant_params.expect_stripped_words = False
        assistant = GeminiLiveAssistantContextAggregator(context, params=assistant_params)
        return GeminiLiveContextAggregatorPair(_user=user, _assistant=assistant)
